译者注

在流式系统方面,网络上流传了两篇精彩的文章,Streaming 101Streaming102。这两篇文章的作者是Google流式系统的负责人Tyler Akidau,他是MillWheel与DataFlow的开发者,在流式系统方面十分权威。恰巧这两篇文章非常详细并且适合初学者,可以帮助我们理清流式系统的各种概念与面临的挑战,是十分难得的佳作。其中Streaming101已经有很多翻译的版本,质量参差不齐。官方的翻译版本参考这里,然而不幸的是它依然晦涩难懂。这里并不打算翻译101这篇文章,读者可以参考原文和译文进行学习,它主要介绍了流式系统的一些基本概念和困境,同时提出了一个核心观点:设计优良的流式系统完全可以代替批量系统,并从原理上分析了如何实现这一目标。在流式102这篇文章中,作者以实际的流式系统(DataFlow)的设计为例,列举了多个场景,具体讲解了如何设计流式系统,解决这些问题。这里将翻译流式102这篇文章,我会参考原文采用意译的方式,力求更好的帮助读者理解。

在阅读流式102之前,请务必通篇阅读流式101,理解核心概念,这里的思维导图仅供参考。Let's Go!

简介

欢迎回来!如果您错过了之前的《The world beyond batch: Streaming 101》,强烈推荐先阅读这篇文章,它是理解下文的基础。请注意:我们假设你已经熟悉了那些技术术语和概念。此外,本文有一些动画Gif,如果你用打印出来看就会丢失这些信息。

让我们进入正题。简要回顾一下,上一次我们主要讨论了3个主题:

这篇文章中,我们会接着深入探讨数据处理模式,但是会更加关注细节和具体的应用场景。本文主要分为两个章节:

当此文完结,我们就会掌握构建一个健壮的处理无序数据的系统所需的核心原理与概念。有一些时序推理工具可以让你真正地超越批量系统!😏

为了给你提供一些直观感受,我会使用DataFlow SDK的代码片段(例如:Google DataFlow API)。同时,我还用了一些动画来可视化一些概念。至于为何使用DataFlow而不是Spark Streaming或者Storm这类系统,其一是我更熟悉DataFlow,其二是其他系统并没有这种表现能力来展示我所涉及的例子(潜台词:功能不全面)。好消息是其他系统也在朝着这个方向努力,一个更大的好消息是Google现在已经向Apache基金会提出Propsal来建立一个DataFlow的孵化器项目(与Artisans, Cloudera, Talend等公司合作)。它的目标旨在以由DataFlow模型提供的强大的无序数据处理语意为基础来建立一个开源社区和生态。这会在2016年渐渐变得有趣,我跑题了。。。

抱歉,本文中没有上次我承诺的各个流式框架比较那部分。我错误估计了本文包含的内容以及时间,我不能再拖稿了。作为补偿,最后会附上我在2015年新加坡Hadoop大会上发表演讲《The evolution of massive-scale data processing》(在2016的六月会更新版本),它有一些材料就是我想讲的。同时,奉上一个精美的幻灯片,Enjoy。

现在,开始讨论流式!

回顾&&大纲

在流式101中,我首先澄清了一些技术术语:

紧接着我们又涉及了与无界数据处理相关的两个重要概念:

除了101中介绍的两个重要概念,本文我们还将引入另外三个概念

最后,为了更好地理解所有这些概念之间的关系,我们将重新回顾全文,并通过回答下面四个问题来温故知新。我提出的这几个问题都至关重要:

下文会详细讨论这些问题:我会用上一些配色以清晰地表示我们讨论的问题属于上面的哪一种 What/Where/When/How

流式101的精简版

首先,我们来回顾一下Streaming 101中提出的一些概念,但这次我将用一些详细的例子使得这些概念更具体。

What: transformations

经典批处理应用的转换操作已经回答了第一个问题:“计算逻辑是什么?”。尽管许多人可能已经熟悉经典的批处理,但我们将从那里开始,因为它是我们的基础,我将基于他添加一些其他概念。

这一节我们就看一个例子:在一个简单的数据集(如10个值)上按key聚合求SUM。你可以把它想象成某个手游里面通过把个人的分数求和来计算团队总分,或者其他计费类应用或者Monitor的应用等。

对于后面的每一个例子我都会用Dataflow的伪代码来定义具体的Pipeline。这将是伪代码,在某种意义上,我有时会稍稍改变规则,使示例更清晰,去除不必要细节(如使用的具体I / O源)或简化命名(Java中的当前触发器名称是否繁琐,为了清晰起见,我将使用更简单的名称)。除了这些小事情之外(大部分我在后文注释中明确列举),它基本上是真实可用的Dataflow SDK代码。稍后对那些对他们可以编译和运行自己的类似示例感兴趣的人,我还将提供一个链接到实际的代码。

如果你熟悉Spark Streaming或者Flink,那么比较容易理解DataFlow的代码,下面简单介绍DataFlow的两个基本原语:

20241212105125385

如果有疑问或者想查看DataFlow的文档,看这里

为了讲解例子的方便,我们这里使用了一个名叫**input**的PCollection<KV<String, Integer>>作为输入(input是由String/Integer作为键/值对组成的,String是球队名,Interger是每人的分数)。在实际的Pipeline中,input一般从I/O源读入原始数据,然后解析日志数据成Key/Value对,最终转换成PCollection<KV<String, Integer>>。我会在第一个示例中写出所有代码,但是在接下来的其他示例中隐去I/O相关的部分。

因此,一个例子:要求先从数据源中解析出team/score键值对,然后对每个team求和,算出球队总分。代码如下:

// Listing 1. Summation pipeline
PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Sum.integersPerKey());

对于下面的所有例子,我们都会先分析代码,然后用动画展示一个数据集的运行过程。具体点说,就是含有一个key和10条数据的pipeline的执行过程。在实际运行时,会有多台机器并行执行相同的操作。这里仅仅是为了简单清晰地讲解例子。

所有动画都会在两个维度上绘制输入和输出:事件时间(X轴)和处理时间(Y轴)。因此,从pipeline的视角来看,白色的粗线条从底部向上移动代表了真实时间的移动。输入用圆圈表示,圆圈内的数字表示该记录的值。他们一开始是灰色的,pipeline实际观测到他们后会改变颜色。

随着pipeline观测到这些值,会逐渐在状态(State)中累加他们,并最终实际输出结果。状态的变换和输出都由矩形表示,聚合值在矩形上面,被矩形覆盖的区域表示该部分的事件时间/处理时间已经累积计算到了结果中。在批处理引擎中执行Listing 1中的pipeline代码,运行过程如下(请注意,您需要点击/点击下面的图像才能启动动画,然后再循环直到再次点击/再点击):

图2:传统的批量处理过程

由于这是一个批量的pipeline,因此,只有接收到所有的input值,系统才会累加状态(由图中顶部的绿色虚线表示),最终产生了唯一的输出:51。而且,这个例子由于没有使用任何Window,我们计算了所有Event-Time内的值的和。所以,图中用于表示状态和输出的矩形覆盖了整个X轴。但是,如果我们要处理一个无界数据源,那么传统的批量处理就不行了;我们不能等待输入数据结束,因为它实际上永远不会结束。因此,我们需要引入一个在Streaming101中提及的概念:窗口。在回答第二个问题,“计算什么时间范围的数据?”之前,我们先简要回顾一下窗口。

Where: windowing

如上次讨论的那样,窗口化是沿着时间边界分割数据源的过程。常见的窗口划分策略包括固定窗口,滑动窗口和会话窗口。

20241212105125455

看一个实际的例子:把上面的求和的 pipeline 划分为 2 分钟的固定时间窗口。使用 DataflowSDK,添加一个 Window.into transform 操作即可:

// Listing 2. Windowed summation code.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))) // 这一行
  .apply(Sum.integersPerKey());

回想一下,由于语义上批处理只是流式的一个子集,所有 Dataflow 提供了一种统一的批处理和流式模型。因此,我们首先在批量引擎上执行这个 pipeline,这种机制看起来更直观。当我们切换为流式引擎时,可以直接与这个进行比较。

图4:在批量引擎上对窗口进行求和

如前所述,直到输入数据被完全读入,状态才会累加,之后产生输出。但是,在这种情况下,我们得到的不是一个输出值,而是四个输出值:每个输出对应着 2 分钟的事件时间窗口。

至此,我们回顾一下 Streaming 101 中介绍的两个主要概念:事件时间(Event-Time)与处理时间(Processing-Time)的关系以及窗口化(Windowing)。如果我们想深入学习,我们需要增加本节开头提到的新概念:水印(Watermark),触发器(Trigger)和累积(Accumulation)。因此开始 Streaming 102。

流式 102

我们刚刚看到了如何在批量引擎上运行窗口化的 pipeline。但是,我们更希望系统有更低的延迟并且可以原生支持处理无界数据源。切换到流式引擎是正确的方向!但问题是,在批量引擎上我们清楚地知道什么时间点数据完整了(比如:当有限数据源的数据全都读入的时候),但是对于无界的数据源,我们缺乏一种有效的方式来判断数据完整性(Completeness)。因此,引入了 Watermark

When: watermarks

Watermark 可以给出"何时将计算结果输出?"这个问题的一半答案:Watermark 是在 Event-Time 域上的时间概念,用来刻画输入完整性。(这句话说明,watermark 首先是表示的是一个 Event-Time 时间)。换句话说,它们是系统以 Event-Time 为尺度来衡量事件流中 Record 处理进度/完整性的方式(不管是无界数据还是有界数据都适用,显然在无界的情况下更有用)。

回想一下 Streaming 101 中的这个图,这里稍作修改,其中描述了事件时间和处理时间之间的偏差(skew),在真实的分布式系统中,这个偏差会随时间不断变化。

20241212105125671

上面这个红色曲线就是真实的 Watermark,随着 Processing-Time 的推移,他描述了 Event-Time 纬度的完整性的过程。你可以把 Watermark 看成是 F (P) -> E 的函数:输入是 Processing-Time,输出是 Event-Time。(确切的说,函数的输入是在 pipeline 中被观测到的 Watermark 这一点的所有上游的当前状态:输入源,缓冲数据,正在处理的数据等;但在概念上,将其视为从 Processing-Time 到 Event-Time 的映射更为简单。)在 Event-Time 上的这一点 E 表示:系统相信在 E 之前的所有数据都被观测到了。换句话说,系统『确信』不会再有 Event-Time<E 的数据出现了。根据这种『确信』是不是严格保证或者仅仅是猜想,我们把 Watermark 分为两种类型:完美 Watermark 与启发式 Watermark。

Watermark 是一个有意思而且复杂的话题,超出我的讨论范围,期望未来有时间写个帖子讨论它。现在,为了更好地了解 Watermark 的作用和缺陷,我们使用 Watermark 的流式引擎,来看看 Listing 2 中的 pipeline 代码何时将计算结果输出。左边的例子使用完美的 Watermark,右边的使用了启发式 Watermark。

[图6:在流式引擎上分别使用完美watermark(左)与启发式watermark(右)进行分割窗口求和的运算](https://www.youtube.com/watch?v=JKpe_wUyUWg)

这两种情况都是 watermark 到达窗口终点时结果被输出(注:看图中 watermark 的先与12:02 12:04 12:06 12:08的交点)。主要的不同在于:右侧启发式算法没有计算『9』这个值,这对 watermark 曲线的形态影响巨大。这个例子突出了 watermark 的两个缺点(不仅 watermark,任何一种完整性概念都有类似的问题),具体来说:

在 Streaming 101 中,我强调过:仅仅想使用完整性(Completeness)这个概念来健壮地处理无序无界的数据流还远远不够。Watermark 太慢或太快这两大缺点就是我这个观点的依据。仅仅依靠完整性的系统是不能同时获得低延迟正确性的,解决这些问题的关键是引入触发器(Trigger)。

When: The wonderful thing about triggers, is triggers are wonderful things!(触发器是个好东西)

"何时将计算结果输出?"这个问题的另一半答案就是触发器。触发器声明了一个窗口的计算结果什么 Processing-Time 时间被输出?(但是,触发器自身做出决定可能依据的是其他 Event-Time 时间域发生了什么,比如 watermark 线的进度)。窗口内的每次特定输出被称为窗口的窗格(pane)。

触发 Trigger 的信号包括下面这些:

除了使用某个特定信号的简单触发器之外,还有组合触发器,允许创建更复杂的触发逻辑。组合触发器包括:

为了让 Trigger 这个概念更具体,让我们明确地表示出图 6 中(也是 Listing 2 的代码中)隐含的触发器:

// Listing 3. Explicit default trigger.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(AtWatermark())) // 这一行
  .apply(Sum.integersPerKey());

考虑到这一点,基于对触发器功能的基本了解,我们可以思考来解决 watermark 太慢或太快的问题。对于这两个问题,我们基本上希望对于一个窗口可以定期更新输出值(即除了水印线到达窗口终点之外的更新)。因此,我们需要一些**重复触发(Repetitions)**器。那么这个问题就变成了:重复触发的条件是什么?

对于输出太慢的情况(即希望提供早期的推测结果的情况),我们可以假设给定窗口的早期阶段,流入数据量是稳定且不完整的。因此按照 Processing-Time 周期性触发(例如 1 分钟一次)是明知。因为触发的次数与数据量无关,最坏的情况就是获得稳定不变的触发数据流。

对于输出太快的情况(即对于启发式 watermark 可以处理迟到的数据),我们可以假定 watermark 是基于比较准确的启发式算法(这个加上还是比较靠谱的)。在这种前提下,我们不会经常看到 late data 到达,但是如果出现了这种情况,我们必须快速处理更新输出的结果(例如,看到 late data,立刻更新)。因为我们假设这种情况很少见,所有他不会使得系统过载。

注意,这里只是举了一个例子,你可以根据自己的情况自由选择触发的条件(比如只在上面某一种情况下触发,或者都不触发)

最后,我们需要编排好各种触发器的时间:比 watermark 早触发,在 watermark 达到时触发还是比 watermark 晚触发?我们可以通过一个 Sequence 触发器和一个特殊的 OrFinally 触发器来实现,OrFinally 触发器可以安装一个子 Trigger,当子 Trigger 触发时终止父 Trigger。

// Listing 4. Manually specified early and late firings.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(Sequence( // 这两行
                 Repeat(AtPeriod(Duration.standardMinutes(1)).OrFinally(AtWatermark()), 
                 Repeat(AtCount(1))))
  .apply(Sum.integersPerKey());

但是,这样写起来有些麻烦。由于这种 repeated-early | on-time | repeated-late 的模式十分常见,我们在 Dataflow API 中提供了特定的简写方式(语意上等价的)。

// Listing 5. Early and late firings via the early/late API.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) // 这两行
                   .withLateFirings(AtCount(1))))
  .apply(Sum.integersPerKey());

在流式引擎上执行 Listing 4/5 的代码(使用完美 watermark 和启发式 watermark)产生的结果如下:

图7.在安装了early/late触发器的流式引擎上进行窗口求和运算

这个版本比图 6 的版本有两处明显的提升:

一个意外的惊喜是:新加入的触发器机制让两种类型的 watermark 算法产生了统一的输出。对比图 6 的两个算法版本还截然不同,在这里的他们却十分相似。

这两个版本还有一个很大的区别:他们窗口的生命周期不同。完美 watermark 的案例中,我们知道 watermark 通过窗口终点后,系统不会再看到该窗口的任何数据了,因此我们可以关闭当前窗口的所有状态。在启发式 watermark 案例中,我们仍然需要让窗口等待一段时间来处理 late data。但是问题是:到目前为止,我们没有办法知道需要保留窗口多长时间。这里我们需要引入『允许迟到时间』这个概念(也叫 horizon)。

When: allowed lateness (垃圾回收,何时关闭 Window)

在进入最后一个问题"后续数据的处理结果如何影响之前的处理结果?"之前,我们要讨论一下处理长期无序数据数据流的流式系统必备的一个功能:垃圾回收。图 7 的启发式算法 watermark 的例子中,每个窗口的状态在该示例的整个生命周期内都会保持。为了处理 late data,这么干是必要的。但是,在实际环境中,当处理无限数据源时,无限期地保持窗口状态(包括元数据)是不切实际的,我们最终会耗尽磁盘空间。

因此,任何实际的无序处理系统都需要提供一些限制窗口生命周期的方法。一个简单的办法是在系统内定义一个允许数据迟到的视界(horizon,理解成时间范围) — 例如:对记录数据可以影响处理流程的时间(相对于 watermark)进行限制;任何在这个时间点后到达的数据都会被简单地抛弃。一旦你划定了允许数据可能有多晚到达,你就准确地确定了窗口状态需要保持的最长时间,这段时间就是 watermark 线到达窗口终点线之后再继续等待的时间。此外还给予系统尽快丢弃超过 horizon 的数据的自由,这意味着不要在我们不关心的数据上浪费任何资源。

译者注:什么时间关闭窗口(垃圾回收)?
Watermark 超过窗口结束的延迟时间(EventTime_diff)。(我们定义)即传入当前 ProcessTime,算出 EventTime 2,我们指定一个 EventTime_diff,对于 EventTime 1 的窗口,如果 EventTime 2-EventTime 1>=EventTime_diff, 就可以关闭之前的窗口了。

为什么用 watermark(Event-Time)来做呢?而不是 Processing-Time(比如在 watermark 到了之后等待 n 秒钟的 Processing-Time)?
参考文章给出的注释:因为可能有其他原因导致系统崩溃延迟等等,使得 processTime 就这么过去了,窗口过早地关闭!

由于『允许迟到时间』和『watermark』之间的相互作用有点微妙,所以值得再举一个例子。我们来看一下 Lising 5 /图 7 中的启发式 watermark 的 pipeline 的例子,我们添加一分钟的允许迟到时间(视界,horizon)(请注意,这个特定的 horizon 被选择是因为适合图标展示;在实际工程中,更大的 horizon 可能会更好一些):

// Listing 6. Early and late firings with allowed lateness.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .withAllowedLateness(Duration.standardMinutes(1))) // 这一行
  .apply(Sum.integersPerKey());

上面这个 pipeline 的执行如下面的图 8 所示,我给图标添加了下面这些特性来强调『允许延迟时间』的影响:

图8. 运行在流式引擎上,具有早期和晚期启动并且允许1分钟延迟的窗口求和

关于『允许延迟时间』(horizon)的最后两点说明:

好了,让我们进入第四个也是最后一个问题。

How: accumulation

随着时间的推移,触发器会在一个 Window 中产生多个 Pane。我们遇到了最后一个问题:『后续数据的处理结果如何影响之前的处理结果?』。迄今为止的例子中,每个窗格的新数据都建立在紧邻的前一个窗格的数据之上。但是,实际上有三种不同的更新模式。(注释:事实上有四种模式,第四种是丢弃并更正--discarding and retracting,这并不常用,这里不会讨论它)

要使用丢弃模式,我们将对 Listing 5 进行以下更改:

// Listing 7. Discarding mode version of early/late firings.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .discardingFiredPanes()) // 这一行
  .apply(Sum.integersPerKey());

在具有启发式 watermark 的流式引擎上再次运行代码将产生如下所示的输出:

[图9. 流式引擎上的含有early/later的触发器的丢弃模式版本](https://fast.wistia.net/embed/iframe/64r8oawoc2?videoFoam=true&wvideo=64r8oawoc2)

虽然输出的整体形状类似于图 7 的累计模式,但请注意,此丢弃版本中的任何一个窗格都不重叠。因此,每个输出与其他输出是独立的。

如果我们实际看看更正/回撤模式,代码修改类似(但是,请注意,Google Cloud Dataflow 的回撤模式仍然处于开发状态,所以这个 API 中的命名有点推测,不太可能与他相同):

// Listing 8. Accumulating & retracting mode version of early/late firings.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes()) // 这一行
  .apply(Sum.integersPerKey());

在流式引擎上运行,结果如下:

图9. 流式引擎上的含有early/later的触发器的更正模式版本

由于窗口内的窗格相互关联,不太容易看清这些回撤值。回撤值用红色标记了,他与蓝色的窗格重叠,所以看起来像是紫色。每个窗格的两个输出值用逗号分割使他们更容易区分。

让我们把图 7(只有启发式的版本),9,10 放到一起比较一下:

20241212105125848

你可以想到,在存储和计算成本方面,丢弃模式,累计模式,累计&更正模式成本是不断增加的。为此,积累模式的选择也是对正确性,延迟和成本进行权衡。

插曲

至此,我们已经回答了所有四个问题:

但是我们只看了一种类型的窗口化例子:Event-Time 的固定窗口。通过流式 101 我们知道,还有许多方式进行窗口化,今天我还想讨论其中的两种:Processing-Time 的固定窗口Event-Time 的 Session 窗口

When/Where: Processing-time windows

Processing-Time 窗口十分重要有两个原因:

因此,值得深入了解 Processing-Time 窗口和 Event-Time 窗口之间的不同,特别是考虑到当今大多数流式系统中 Processing-Time 窗口已经被普及。

当我们面对的模型是严格使用 Event-Time 作为基础时(例如本文的例子),有两种方式来获得 Processing-Time 窗口:

请注意,这两种方法基本等价,但是在有多个 Stage 的 pipeline 的情况下,它们略有不同:在触发器版本,每个 Stage 都是独立地分割 process-time 窗口,因此,在第一个 Stage 的窗口 X 中出现的数据,可能在下一个 Stage 的窗口 X-1 或者窗口 X+1 中出现。而在使用入口时间作为 Processing-Time 的版本中,一旦将数据并入到窗口 X 中,那么在整个 pipeline 中都会在窗口 X 中,这是由于不同 stage 会通过 watermark(在 DataFlow 情况下,Spark Streaming 是通过 micro-batch 的边界来同步)或其他引擎级别的方式来同步处理进度。

**使用 Processing-Time 划分窗口的最大缺点就是:当输入数据被观测到是顺序发生改变时,窗口的内容就改变了。**为了用更具体的方式研究这一点,我们比较下面三种场景:

我们会在这三种场景上分别使用两个不同的数据集(所以,一共会有 2*3 中情况)。这两个数据集除了观测到的顺序不同,其他均完全相同(比如相同的值发生相同的 Event-Time)。第一套数据集就是我们之前一直看到的那个顺序(白色标记)。第二套所有的值都在 Processing-Time 的轴上移动了(如图 12 所示,使用紫色标记)。你可以想象一下,只要发生一点意外(比如使用复杂的分布式系统就会打乱顺序),这种情况就会发生。

图12 - 移动输入值在Processing-Time上的位置,同时保持相同的值和Event-Time

Event-time windowing(使用 Event-Time 窗口化)

为了建立一个比较基线,我们先看一下使用了 Event-Time 固定窗口搭配启发式 watermark 的引擎分别作用到这两个数据集上的结果。我们复用 Listing 5、图 7 中使用了 early/later 触发器的代码,得到的结果如下。左侧和我们以前看到的结果一样,右侧是使用了第二种顺序的数据集的结果。重点是:虽然输出过程的形状不一样,但是最终四个窗口的输出结果全部相同:14,22,3 和 12。

图13.在Event-time固定窗口上使用两个不同Processing-Time顺序的数据集的结果

Processing-time windowing via triggers(使用 Trigger 实现的 Processing-Time 窗口化)

好,现在让我们来比较一下使用了 Processing-Time 的两种方案。首先是 Trigger 方式。我们从三个角度来看这种方式的 Processing-Time 窗口是如何工作的:

相应的代码如 Listing 9,注意:全局窗口是默认行为,,因此无需指定窗口化策略:

// Listing 9. Processing-time windowing via repeated, discarding panes of a global event-time window.
PCollection<KV<String, Integer>> scores = input
  .apply(Window.triggering(
                  Repeatedly(AtPeriod(Duration.standardMinutes(2))))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

当在流式引擎上运行上述代码分别处理上述两组不同观测顺序的数据集,结果如下面图 14 所示,一些值得注意的点有:

[图14,通过触发器实现的Processing-Time窗口,分别在两组不同观测顺序的数据集上运行](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102?wvideo=befc8n62yh#F6)

Processing-time windowing via ingress time(使用入口时间实现的 Processing-Time 窗口化)

最后,让我们看看使用输入数据的入口时间作为 Event-Time 来实现 Processing-Time 窗口的方案. 在代码方面,这里有四个方面值得一提:

// Listing 10. Explicit default trigger.
PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

在流式引擎上的执行过程如下图 15 所示。当数据到达时,它们的 Event-Time 被更新以匹配它们的入口时间(即到达时的 Processing-Time),随着 Processing-Time 推移,完美的 watermarker 线向右水平移位。该图中注意点:

图15,在相同输入的两种不同处理时间排序中,通过使用入口时间进行处理时间窗口划分

虽然使用这两种方式实现 Processing-Time 窗口很有趣,但是:这里你要明白的最重要的一点是:Event-Time 窗口至少在运算结果上是输入数据顺序无关无关(在输入完成之前,实际的 Panel 会不断变化),而 Processing-Time 窗口却不是这样。如果你关系事件发生的真实时间,必须使用 Event-Time 窗口,否则计算结果毫无意义!!

Where: session windows(会话窗口)

我们就要讲完所有的例子啦。。。如果你读到这里,真的很有耐心。好消息是你的耐心没有白费,下面我会讲一个我最喜欢的特性:数据驱动的动态窗口—Session。请看好了。

Session 是一种比较特殊的窗口,它会用某段不活动的间隔为界来捕获一段时间内的事件。在数据分析领域,这种窗口特别有用:它能从用户视角提供用户某段时间内参与了哪些活动。在 Session 内看到关联的活动,根据会话的长度推断出用户的参与度。

从窗口化的视角,Sesiion 在下面这两方面特别有意思:

在某些场景下:对于单个 Session 内的数据,我们可以在时间的前面加一个通用的标志符来标记它属于某个 Session(例如:视频播放器的携带服务质量信息心跳包 ping。对于某次观看,可以在所有的 ping 的时间前面加上 Session ID)。那么,这个例子的 Session 十分容易构建了,只要把数据按照 Key 聚合就是一个 Session。

但是,一些更通用的场景(例如:Session 自己没法知道它已经开始了)需要这样:Session 必须根据数据在时间内的位置来构建。在无序数据集的情况下,这十分棘手。

要提供通用的 Session 支持,一个核心的洞察是:一个 Session Window 可以看成是相互重叠的小 Window 的组合,其中这里的小 Window( proto-session window)比较特别,他们只含有一个 Record,窗口大小是 inactivity 的时间。因此,即使我们看到的数据是无序的,我们可以随着数据一个一个来到,简单地把这些有重叠的小 Window 合并起来组成一个最终的 Session Window。

20241212105125947

我们来看一个代码示例,通过使用 Listing 8 的代码(包含了 early/late 触发器和 retract 的更新机制)来构建 Session:

// Listing 11. Early and late firings with session windows and retractions
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))) // 这一行
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

在流式引擎上允许结果如下图 17:

Figure 17 - 具有early/late触发器和retract更新模式的Sessions窗口,一个生动的例子

上图内容是否丰富,我们一起看一下:

这很强大!更令人惊叹的是:在这么一个模型中描述他们是这么地简单。这个模型把流处理的按纬度分解成可区分可组合的部分的。最后,我们只要关系业务逻辑,而不用操心怎么把数据组合成有用的形式。

如果你不觉得这有多好,看看这篇文章如何使用Spark Streaming手动建立Session 有多痛苦就懂了(注意:我没有职责他们,Spark其实已经做了不错了,现在已经有人努力去写文章说明如何在 Spark 之上建立一个特定的 Session)。当然,他们也没有合适的 Event-Time Session,也没有提供 early, late 的触发机制,还没有 retract 的更新模式。

终结篇,感觉好极了!

就这些了,讲完了所有的例子,撒花,撒花!你已经有了健壮的流式处理的基础知识,可以出发进入流式的世界。在结束之前,我快速回顾一个我们所涉及的内容,以免你错过了什么。首先,我们所涉及的主要概念:

其次,我们研究了下面四个问题(我承诺这是最后一次说它们)

最终,深入这种流式处理模型的提供的灵活性(因为最终我们就是平衡准确性-Correctness,延迟-Latency 和成本-Cost, 这些因素关系)。回顾一下:我们只要修改一点点代码就能实现在相同数据集上各种不同的产出:

图 18. 相同输入数据,对应 9 种不同的产出

20241212105126183 Classic batch: Listing 1 / Figure 2 20241212105126276 Fixed windows batch: Listing 2 / Figure 4 20241212105126519 Fixed windows streamingwatermark: Listing 2 / Figure 6
20241212105126612 Early/late discarding: Listing 7 / Figure 9 20241212105126813 Early/late accumulatingListings: 4 & 5 / Figure 7 20241212105126929 Early/late retracting: Listing 8 / Figure 10
20241212105127034 Processing-time (triggers): Listing 9 / Figure 14 20241212105127145 Processing-time: (ingress time) Listing 10 / Figure 15 20241212105127231 Sessions: Listing 11 / Figure 17

图 18:相同输入数据,对应 9 种不同的产出. Credit: Tyler Akidau.

感谢您的耐心和兴趣,我们下次再见!

后记

额外的资源

文章中一些与现实不同的地方(略)

致谢(略)

注脚(略)

部分翻译到文章中。

作者简介

Tyler Akidau 是 Google 的软件工程师。目前,是 Google 内部流式数据处理系统(例如“MillWheel”)的技术主管,他花了五年时间研究大型流式数据处理系统。他热衷于将流数据处理视为更为一般的大规模计算模型。他最喜欢的交通工具是可以带着他两个小女儿的 cargo bike。

参考文章: